package com.paic.common.middleware.util;

import java.io.*;
import java.net.*;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;

/**
 * HDFS工具类 Author: 菩提树下的杨过(http://yjmyzz.cnblogs.com) Since: 2015-05-21
 */
public class HDFSUtil {

    private HDFSUtil() {

    }

    /**
     * 判断路径是否存在
     *
     * @param conf Configuration句柄
     * @param path HDFS路径
     * @return 路径是否存在
     */
    public static boolean exits(Configuration conf, String path)
            throws IOException {
        FileSystem fs = FileSystem.get(URI.create(path), conf);
        return fs.exists(new Path(path));
    }

    public static String checkPathExist(Configuration conf, String path)
            throws IOException {
        FileSystem fs = FileSystem.get(URI.create(path), conf);
        if (fs.exists(new Path(path))) {
            return "1";
        }

        return null;
    }

    /**
     * 创建文件
     *
     * @param conf     Configuration句柄
     * @param filePath 文件目录
     * @param contents 文件内容
     * @throws IOException
     */
    public static void createFile(Configuration conf, String filePath,
                                  byte[] contents) throws IOException {
        FileSystem fs = FileSystem.get(URI.create(filePath), conf);
        Path path = new Path(filePath);
        FSDataOutputStream outputStream = fs.create(path);
        outputStream.write(contents);
        outputStream.close();
        fs.close();
    }

    /**
     * 创建文件
     *
     * @param conf
     * @param filePath
     * @param fileContent
     * @throws IOException
     */
    public static void createFile(Configuration conf, String filePath,
                                  String fileContent) throws IOException {
        createFile(conf, filePath, fileContent.getBytes());
    }

    /**
     * @param conf
     * @param localFilePath
     * @param remoteFilePath
     * @throws IOException
     */
    public static void copyFromLocalFile(Configuration conf,
                                         String localFilePath, String remoteFilePath) throws IOException {
        FileSystem fs = FileSystem.get(URI.create(remoteFilePath), conf);
        Path localPath = new Path(localFilePath);
        Path remotePath = new Path(remoteFilePath);
        fs.copyFromLocalFile(true, true, localPath, remotePath);
        fs.close();
    }

    /**
     * 删除目录或文件
     *
     * @param conf
     * @param remoteFilePath
     * @param recursive
     * @return
     * @throws IOException
     */
    public static boolean deleteFile(Configuration conf, String remoteFilePath,
                                     boolean recursive) throws IOException {
        FileSystem fs = FileSystem.get(URI.create(remoteFilePath), conf);
        boolean result = fs.delete(new Path(remoteFilePath), recursive);
        fs.close();
        return result;
    }

    /**
     * 删除目录或文件(如果有子目录,则级联删除)
     *
     * @param conf
     * @param remoteFilePath
     * @return
     * @throws IOException
     */
    public static boolean deleteFile(Configuration conf, String remoteFilePath)
            throws IOException {
        return deleteFile(conf, remoteFilePath, true);
    }

    /**
     * 文件重命名
     *
     * @param conf
     * @param oldFileName
     * @param newFileName
     * @return
     * @throws IOException
     */
    public static boolean renameFile(Configuration conf, String oldFileName,
                                     String newFileName) throws IOException {
        FileSystem fs = FileSystem.get(URI.create(newFileName), conf);
        Path oldPath = new Path(oldFileName);
        Path newPath = new Path(newFileName);
        boolean result = fs.rename(oldPath, newPath);
        fs.close();
        return result;
    }

    /**
     * 列出指定目录下的文件\子目录信息（非递归）
     *
     * @param conf
     * @param dirPath
     * @return
     * @throws IOException
     */
    public static FileStatus[] listStatus(Configuration conf, String dirPath)
            throws IOException {
        FileSystem fs = FileSystem.get(URI.create(dirPath), conf);
        FileStatus[] fileStatuses = fs.globStatus(new Path(dirPath));
        // fs.close();
        return fileStatuses;
    }

    /**
     * 创建目录
     *
     * @param conf
     * @param dirName
     * @return
     * @throws IOException
     */
    public static boolean createDirectory(Configuration conf, String dirName)
            throws IOException {
        FileSystem fs = FileSystem.get(URI.create(dirName), conf);
        Path dir = new Path(dirName);
        boolean result = fs.mkdirs(dir);
        fs.close();
        return result;
    }

    /**
     * 返回目录下所有文件
     * @param hdfsFilePath
     *            目录路径
     * @return
     *            目录下所有文件路径（递归）
     */
    public static List<String> getFileListFromPath(Configuration conf, String hdfsFilePath, boolean recursive) throws IOException {
        FileSystem fs = FileSystem.get(URI.create(hdfsFilePath), conf);
        Path path = new Path(hdfsFilePath);
        return HDFSUtil.iteratorShowFiles(fs, path, recursive);
    }

    /**
     * 递归的遍历目录下所有文件（包括子目录）
     * @param hdfs
     *            FileSystem 对象
     * @param path
     *            文件路径
     */
    public static List<String> iteratorShowFiles(FileSystem hdfs, Path path, boolean recursive) {
        List<String> fileList = new ArrayList<String>();
        try {
            if (hdfs == null || path == null) {
                return null;
            }
            // 获取文件列表
            FileStatus[] files = hdfs.listStatus(path);

            // 展示文件信息
            for (int i = 0; i < files.length; i++) {
                try {
                    if (recursive && files[i].isDirectory()) {
                        // 递归调用
                        fileList.addAll(iteratorShowFiles(hdfs, files[i].getPath(), recursive));
                    } else if (files[i].isFile()) {
                        fileList.add(files[i].getPath().toString());
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

        return fileList;
    }

    /**
     * 读取文件内容
     *
     * @param conf
     * @param filePath
     * @return
     * @throws IOException
     */
    public static String readFile(Configuration conf, String filePath)
            throws IOException {
        String fileContent = null;
        FileSystem fs = FileSystem.get(URI.create(filePath), conf);
        Path path = new Path(filePath);
        InputStream inputStream = null;
        ByteArrayOutputStream outputStream = null;
        try {
            inputStream = fs.open(path);
            outputStream = new ByteArrayOutputStream(inputStream.available());
            IOUtils.copyBytes(inputStream, outputStream, conf);
            fileContent = outputStream.toString();
        } finally {
            IOUtils.closeStream(inputStream);
            IOUtils.closeStream(outputStream);
            fs.close();
        }
        return fileContent;
    }

    public static void recoverLease(String path) throws IOException {
        DistributedFileSystem fs = new DistributedFileSystem();
        Configuration conf = new Configuration();
        fs.initialize(URI.create(path), conf);
        fs.recoverLease(new Path(path));
        fs.close();
    }

    private static Path getResolvedPath(String dir) throws IOException {
        Configuration conf = new Configuration();
        Path dirPath = new Path(dir);
        FileSystem fs = dirPath.getFileSystem(conf);
        return fs.resolvePath(dirPath);
    }

    private static URI getDFSHttpAddress(Path target) throws IOException {
        //String nnAddress = null;
        Configuration conf = new Configuration();

        //get the filesystem object to verify it is an HDFS system
        final FileSystem fs = target.getFileSystem(conf);
        if (!(fs instanceof DistributedFileSystem)) {
            System.err.println("FileSystem is " + fs.getUri());
            return null;
        }

        return DFSUtil.getInfoServer(HAUtil.getAddressOfActive(fs), conf,
                DFSUtil.getHttpClientScheme(conf));
    }

    @SuppressWarnings("unused")
	private static String getNameNodeAddress() throws IOException {
        Configuration conf = new Configuration(false);
        FileSystem fs = FileSystem.get(conf);
        InetSocketAddress active = HAUtil.getAddressOfActive(fs);
        return active.getHostName() + ":" + active.getPort();
    }

    public static Set<String> getOpenforwriteFileList(String dir) throws IOException {
        /*拼接URL地址，发送给namenode监听的dfs.namenode.http-address端口，获取所需数据*/
        StringBuilder url = new StringBuilder();
        url.append("/fsck?ugi=").append("dev");
        url.append("&openforwrite=1");

        /*获得namenode的主机名以及dfs.namenode.http-address监听端口，例如：http://hadoopnode1:50070*/
        Path dirpath;
        URI namenodeAddress;
        dirpath = HDFSUtil.getResolvedPath(dir);
        namenodeAddress = HDFSUtil.getDFSHttpAddress(dirpath);

        url.insert(0, namenodeAddress);
        try {
            url.append("&path=").append(URLEncoder.encode(
                    Path.getPathWithoutSchemeAndAuthority(new Path(dir)).toString(), "UTF-8"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

        Configuration conf = new Configuration();
        URLConnectionFactory connectionFactory = URLConnectionFactory.newDefaultURLConnectionFactory(conf);
        URL path = null;
        try {
            path = new URL(url.toString());
        } catch (MalformedURLException e) {
            e.printStackTrace();
        }

        URLConnection connection;
        BufferedReader input = null;
        try {
            connection = connectionFactory.openConnection(path, UserGroupInformation.isSecurityEnabled());
            InputStream stream = connection.getInputStream();
            input = new BufferedReader(new InputStreamReader(stream, "UTF-8"));
        } catch (IOException | AuthenticationException e) {
            e.printStackTrace();
        }

        if (input == null) {
            System.err.println("Cannot get response from namenode, url = " + url);
            return null;
        }

        String line;
        Set<String> resultSet = new HashSet<>();
        try {
            while ((line = input.readLine()) != null) {
                if (line.contains("MISSING") || line.contains("OPENFORWRITE")) {
                    String regEx = "/[^ ]*";
                    Pattern pattern = Pattern.compile(regEx);
                    Matcher matcher = pattern.matcher(line);
                    while (matcher.find()) {
                        resultSet.add(matcher.group().replaceAll(":", ""));
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            input.close();
        }

        return resultSet;

    }

    public static void main(String[] args) throws IOException {
//        Set<String> set = HDFSUtil.getOpenforwriteFileList("hdfs://10.29.180.177:8020/report");
//        if (set == null) {
//            return;
//        }
//
//        for (String filePath : set) {
//            if (filePath.contains("2017/07/13")) {
//                System.out.println(filePath);
//                HDFSUtil.recoverLease("hdfs://10.29.180.177:8020" + filePath);
//
//            }
//        }
//        System.out.println("getOpenforwriteFileList over");




    }


}